有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java背压不适用于groupBy运算符

我试图对我的代码施加反压力,但它不起作用。 我尝试使用给定的示例代码here它看起来很有效

Flux.range(1,100)
            .doOnNext(d->getLogger().info("receive record ::: {}",d))
            .flatMap(recordFlux -> Mono.delay(Duration.ofSeconds(30))
                            .doOnNext(d->getLogger().info("processed message :: {}",recordFlux))
                            .then(Mono.just(recordFlux))
                            ,1
             )
        .subscribe();

这是我得到的结果

2021-11-02 15:34:25.678  INFO 7456 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 1
2021-11-02 15:34:55.682  INFO 7456 --- [     parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 1
2021-11-02 15:34:55.684  INFO 7456 --- [     parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 2
2021-11-02 15:35:25.685  INFO 7456 --- [     parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 2
2021-11-02 15:35:25.686  INFO 7456 --- [     parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 3
2021-11-02 15:35:55.687  INFO 7456 --- [     parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 3
2021-11-02 15:35:55.687  INFO 7456 --- [     parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 4
2021-11-02 15:36:25.690  INFO 7456 --- [     parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 4
2021-11-02 15:36:25.691  INFO 7456 --- [     parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 5
2021-11-02 15:36:55.697  INFO 7456 --- [     parallel-5] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 5
2021-11-02 15:36:55.698  INFO 7456 --- [     parallel-5] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 6
2021-11-02 15:37:25.704  INFO 7456 --- [     parallel-6] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 6
2021-11-02 15:37:25.704  INFO 7456 --- [     parallel-6] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 7
2021-11-02 15:37:55.714  INFO 7456 --- [     parallel-7] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 7
2021-11-02 15:37:55.714  INFO 7456 --- [     parallel-7] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 8
2021-11-02 15:38:25.720  INFO 7456 --- [     parallel-8] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 8
2021-11-02 15:38:25.720  INFO 7456 --- [     parallel-8] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 9
2021-11-02 15:38:55.723  INFO 7456 --- [     parallel-9] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 9
2021-11-02 15:38:55.723  INFO 7456 --- [     parallel-9] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 10
2021-11-02 15:39:25.726  INFO 7456 --- [    parallel-10] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 10

但当我在flatmap上添加额外的groupBy时,它就不起作用了。添加groupBy的效果如何?如何使用下面的代码实现上述结果

Flux.range(1,100)
      .doOnNext(d->getLogger().info("receive record ::: {}",d))
      .groupBy(m->1 )
      .flatMap(consumerRecordFlux -> consumerRecordFlux
                                    .doOnNext(a -> getLogger().info("before process message :: partition ::{}, record ::{}",consumerRecordFlux.key(),a))
                                    .flatMap(b-> Mono.delay(Duration.ofSeconds(30))
                                                      .doOnNext(d->getLogger().info("processed message :: {}",b))
                                                      .then(Mono.just(b))
                                      ,1,1
                                    )
            ,1,1
        )
        .subscribe();

这是错误的输入,我得到了上述代码

2021-11-02 15:51:44.827  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 1
2021-11-02 15:51:44.829  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::1
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 2
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 3
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 4
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 5
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 6
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 7
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 8
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 9
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 10
2021-11-02 15:51:44.833  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 11
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 12
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 13
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 14
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 15
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 16
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 17
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 18
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 19
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 20
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 21
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 22
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 23
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 24
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 25
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 26
2021-11-02 15:51:44.834  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 27
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 28
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 29
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 30
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 31
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 32
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 33
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 34
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 35
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 36
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 37
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 38
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 39
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 40
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 41
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 42
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 43
2021-11-02 15:51:44.835  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 44
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 45
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 46
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 47
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 48
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 49
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 50
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 51
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 52
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 53
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 54
2021-11-02 15:51:44.836  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 55
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 56
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 57
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 58
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 59
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 60
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 61
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 62
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 63
2021-11-02 15:51:44.837  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 64
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 65
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 66
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 67
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 68
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 69
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 70
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 71
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 72
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 73
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 74
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 75
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 76
2021-11-02 15:51:44.838  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 77
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 78
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 79
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 80
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 81
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 82
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 83
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 84
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 85
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 86
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 87
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 88
2021-11-02 15:51:44.839  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 89
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 90
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 91
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 92
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 93
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 94
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 95
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 96
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 97
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 98
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 99
2021-11-02 15:51:44.840  INFO 8071 --- [           main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 100
2021-11-02 15:52:14.836  INFO 8071 --- [     parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 1
2021-11-02 15:52:14.837  INFO 8071 --- [     parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::2
2021-11-02 15:52:44.843  INFO 8071 --- [     parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 2
2021-11-02 15:52:44.844  INFO 8071 --- [     parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::3
2021-11-02 15:53:14.846  INFO 8071 --- [     parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 3
2021-11-02 15:53:14.847  INFO 8071 --- [     parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::4
2021-11-02 15:53:44.852  INFO 8071 --- [     parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 4

共 (0) 个答案